import pandas, numpy, dask, bokeh, numba, scipy, prefect, intake
import dask.dataframe as dd
from dask.distributed import Client
import hvplot.pandas
import hvplot.dask
import os, struct, time, tqdm.notebook, dask.diagnostics
server,port = "192.168.1.90","8786"
client = Client()#(n_workers=1,processes=True,threads_per_worker=4, memory_limit=0.5)
time.sleep(10)
00:02:56.271 | INFO | distributed.scheduler - State start
00:02:56.277 | INFO | distributed.scheduler - Scheduler at: tcp://127.0.0.1:37667
00:02:56.279 | INFO | distributed.scheduler - dashboard at: 127.0.0.1:8787
00:02:56.290 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:46199'
00:02:56.294 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:39825'
00:02:56.299 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:33691'
00:02:56.302 | INFO | distributed.nanny - Start Nanny at: 'tcp://127.0.0.1:46311'
00:02:56.938 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:36801', name: 0, status: init, memory: 0, processing: 0>
00:02:56.941 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:36801
00:02:56.943 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:57640
00:02:56.945 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:39953', name: 1, status: init, memory: 0, processing: 0>
00:02:56.947 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:39953
00:02:56.948 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:57650
00:02:56.951 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:39457', name: 3, status: init, memory: 0, processing: 0>
00:02:56.953 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:39457
00:02:56.955 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:57666
00:02:56.957 | INFO | distributed.scheduler - Register worker <WorkerState 'tcp://127.0.0.1:34841', name: 2, status: init, memory: 0, processing: 0>
00:02:56.959 | INFO | distributed.scheduler - Starting worker compute stream, tcp://127.0.0.1:34841
00:02:56.960 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:57680
00:02:56.973 | INFO | distributed.scheduler - Receive client connection: Client-3f3f53e7-f36c-11ed-93b0-b4b5b6a4f425
00:02:56.975 | INFO | distributed.core - Starting established connection to tcp://127.0.0.1:57682
@numba.njit()
def detector_from_pixel(pixel_id):
if pixel_id >= 131072 * 2:
return (pixel_id % (131072 * 2) + 2048) // 128
elif pixel_id >= 131072:
return (pixel_id % 131072 + 1024) // 128
else:
return pixel_id // 128
deconvolution_dict = {
0:{
"iTED": 1,
"Crystal": 0,
"Offset": 0
},
1:{
"iTED": 1,
"Crystal": 1,
"Offset": 128
},
2:{
"iTED": 1,
"Crystal": 2,
"Offset": 256
},
3:{
"iTED": 1,
"Crystal": 3,
"Offset": 384
},
4:{
"iTED": 1,
"Crystal": 4,
"Offset": 512
},
5:{
"iTED": 2,
"Crystal": 0,
"Offset": 640
},
6:{
"iTED": 2,
"Crystal": 1,
"Offset": 768
},
7:{
"iTED": 2,
"Crystal": 2,
"Offset": 896
},
8:{
"iTED": 3,
"Crystal": 0,
"Offset": 131072
},
10:{
"iTED": 3,
"Crystal": 1,
"Offset": 131328
},
11:{
"iTED": 3,
"Crystal": 2,
"Offset": 131456
},
12:{
"iTED": 3,
"Crystal": 3,
"Offset": 131584
},
13:{
"iTED": 3,
"Crystal": 4,
"Offset": 131712
},
14:{
"iTED": 2,
"Crystal": 3,
"Offset": 131840
},
15:{
"iTED": 2,
"Crystal": 4,
"Offset": 131968
},
16:{
"iTED": 4,
"Crystal": 0,
"Offset": 262144
},
18:{
"iTED": 4,
"Crystal": 1,
"Offset": 262400
},
19:{
"iTED": 4,
"Crystal": 2,
"Offset": 262528
},
20:{
"iTED": 4,
"Crystal": 3,
"Offset": 262656
},
21:{
"iTED": 4,
"Crystal": 4,
"Offset": 262784
}
}
deconvolution_df = pandas.DataFrame(deconvolution_dict).T
deconvolute = lambda x: \
(x % (131072 * 2) + 2048) // 128 if x >= 131072 * 2 else (x % 131072 + 1024) // 128 if x >= 131072 else x // 128
pixel_maps = pandas.read_csv(
"../../data/DetectorMaps/DETECTOR_MAPS",
header=None,
delim_whitespace=True, # sep="\s+"
usecols=(1,2),
names=("crystal_id","pixel_map"),
converters={
"pixel_map": lambda s: pandas.read_csv(
f"../../data/DetectorMaps/{s[2:]}",
header=None,
delim_whitespace=True,
)
}
)
#pixel_maps.query('crystal_id == 6').pixel_map.item().stack()
#.to_frame("pixel_id").query("pixel_id == 6").index.item()
th_dict = {
#5:{
# "path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_35_08_C.itedABCD_lab_2023.02.22_4.0v_885_120s.singles.ldat",
# "time": 120,
# "dd": None
#},
#6:{
# "path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_29_47_C.itedABCD_lab_2023.02.22_4.0v_886_120s.singles.ldat",
# "time": 120,
# "dd": None
#},
7:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_26_48_C.itedABCD_lab_2023.02.22_4.0v_887_120s.singles.ldat",
"time": 120,
"dd": None
},
8:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_14_11_C.itedABCD_lab_2023.02.22_4.0v_888_120s.singles.ldat",
"time": 120,
"dd": None
},
9:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_16_27_C.itedABCD_lab_2023.02.22_4.0v_889_120s.singles.ldat",
"time": 120,
"dd": None
},
10:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_24_05_C.itedABCD_lab_2023.02.22_4.0v_8810_120s.singles.ldat",
"time": 120,
"dd": None
},
11:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_32_37_C.itedABCD_lab_2023.02.22_4.0v_8811_120s.singles.ldat",
"time": 120,
"dd": None
},
12:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_05/background_D.2023_05_05_T.13_21_44_C.itedABCD_lab_2023.02.22_4.0v_8812_120s.singles.ldat",
"time": 120,
"dd": None
},
13:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_08/background_D.2023_05_08_T.14_16_43_C.itedABCD_lab_2023.02.22_4.0v_8813_120s.singles.ldat",
"time": 120,
"dd": None
},
0:{
"path": "/run/media/bgameiro/d043b5e4-57a4-457e-8839-cb1adc9c72bc//Data/data_2023_05_08/background_D.2023_05_08_T.15_38_52_C.itedABCD_lab_custom_2023.02.22_4.0v_887_120s.singles.ldat",
"time": 120,
"dd": None
},
}
def get_file(file_name):
dt = numpy.dtype([('time', numpy.int64), ('energy', numpy.float32), ('id', numpy.int32)])
return numpy.fromfile(file_name, dtype=dt)
def get_entry_from_binary3(file_np):
dt = numpy.dtype([('time', numpy.int64), ('energy', numpy.float32), ('id', numpy.int32)])
dd = dask.dataframe.from_array(
file_np,
columns=dt.names
)
return dd
def get_entry_from_binary4(file_np):
dt = numpy.dtype([('time', numpy.int64), ('energy', numpy.float32), ('id', numpy.int32)])
dd = pandas.DataFrame(
file_np,
columns=dt.names
)
return dd
@dask.delayed
def get_entry_from_binary(file_name, chunk_size=8+4+4):
with open(file_name, "rb") as f:
while (entry := f.read(chunk_size)):
yield dict(zip(("time","energy","id"), struct.unpack("qfi", entry)))
#file = "../../data/Multi_iTED_characterization/test_D.2023_03_31_T.16_26_55_C.itedABCD_lab_custom_2023.02.22_4.0v_887_5s.singles.ldat"
with tqdm.notebook.tqdm(total=len(th_dict)) as pbar1, tqdm.notebook.tqdm(total=5) as pbar2:
for i in th_dict:
pbar1.set_description(f"Processing th={i:2}")
pbar2.set_description(f"Entries")
entries_df = get_entry_from_binary4(get_file(th_dict[i]["path"]))
pbar2.update(1)
pbar2.set_description(f"Counts")
th_dict[i]["dd"] = entries_df.id\
.value_counts()\
.to_frame(name="counts")
pbar2.update(1)
pbar2.set_description(f"Normalize")
th_dict[i]["dd"].counts /= th_dict[i]["time"]
pbar2.update(1)
pbar2.set_description(f"ID th")
th_dict[i]["dd"]["th"] = i
pbar2.update(1)
pbar2.set_description(f"ID pixel")
th_dict[i]["dd"]["id"] = th_dict[i]["dd"].index
pbar2.update(1)
pbar2.update(1)
pbar1.update(1)
dd = dask.dataframe.concat([th_dict[i]["dd"] for i in th_dict], ignore_order=True)
dd["crystal_id"] = dd.id.map(deconvolute, meta=("x", "int8"))
dd["iTED"] = dd.crystal_id.map(lambda x: deconvolution_dict[x]["iTED"], meta=("x", "int8"))
dd["crystal"] = dd.crystal_id.map(lambda x: deconvolution_dict[x]["Crystal"], meta=("x", "int8"))
dd["pixel"] = dd.id.map(lambda x: x - deconvolution_dict[deconvolute(x)]["Offset"], meta=("x", "int8"))
dd["coordinates"] = dd.apply(
lambda entry: pixel_maps.query(f'crystal_id == {entry.crystal_id}')
.pixel_map
.item()
.stack()
.to_frame("pixel_id")
.query(f"pixel_id == {entry.pixel}")
.index
.item(),
axis=1,
meta=(None, 'object')
)
dd['x'] = dd.coordinates.map(lambda x: x[1], meta=("x", "int8"))
dd['y'] = dd.coordinates.map(lambda x: x[0], meta=("x", "int8"))
0%| | 0/8 [00:00<?, ?it/s]
0%| | 0/5 [00:00<?, ?it/s]
#import pickle
#with open('dict.pickle', 'wb') as handle:
# pickle.dump(th_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)
#import pickle
#with open('dict.pickle', 'rb') as handle:
# th_dict = pickle.load(handle)
dd.hvplot.box('counts', by='th')
dd.groupby("th").counts.sum().compute()
th 7 957088.558333 8 516637.916667 9 354710.716667 10 380974.108333 11 188903.508333 12 334494.316667 13 338020.658333 0 432808.200000 Name: counts, dtype: float64
dd.counts.nlargest(10).compute()
131640 137478.933333 262173 50997.333333 262173 48940.266667 262173 45969.016667 131640 44685.250000 262173 42180.158333 546 41194.841667 131615 26564.758333 131625 22233.108333 262172 20718.225000 Name: counts, dtype: float64
with dask.diagnostics.ProgressBar():
global ddd
ddd = dd.query('th == 7').compute() # Base threshold
ddd = ddd.set_index('id')
threshold_counts = ddd.groupby("crystal_id").counts.median()*5
with tqdm.notebook.tqdm(total=len(th_dict)) as pbar1, tqdm.notebook.tqdm(total=len(threshold_counts)) as pbar2:
for th in th_dict:
pbar2.reset()
pbar1.set_description( f"Threshold: {th:2}")
for crystal in ddd.crystal_id.unique():
pbar2.set_description(f"Crystal: {int(crystal):4}")
ddd.update(
dd.query(f"th == {th} & id in {list(ddd.query('counts > @threshold_counts[@crystal]').index.values)}")\
.compute()\
.set_index('id')
)
pbar2.update(1)
pbar1.update(1)
0%| | 0/8 [00:00<?, ?it/s]
0%| | 0/20 [00:00<?, ?it/s]
ddd["id"] = ddd.index
print("Total sum:")
print(ddd.counts.sum())
print("Most active:")
print(ddd.counts.nlargest(10))
print("Stats:")
ddd.counts.describe(percentiles=[.9,.95])
Total sum: 300438.2166666667 Most active: id 262173 369.400000 131350 311.691667 131484 311.566667 262797 311.458333 549 311.441667 131770 311.441667 262458 311.433333 262837 311.425000 262411 311.333333 262812 311.291667 Name: counts, dtype: float64 Stats:
count 1280.000000 mean 234.717357 std 69.530743 min 0.150000 50% 261.429167 90% 303.555833 95% 307.600000 max 369.400000 Name: counts, dtype: float64
ddd.hvplot.box('counts', by='crystal_id')
# Crystal to see
ddd.hvplot.heatmap(
x='x', y='y', C='counts', groupby='crystal_id', by='crystal_id',
height=500, width=500, colorbar=False,
hover_cols=["id","th"]
)
ddd.th.value_counts()
7.0 884 9.0 258 11.0 100 8.0 33 10.0 4 0.0 1 Name: th, dtype: int64
"""
improved_threshold=ddd[["th","id"]].astype("int32")#.query("th != 7")#.to_csv("thresholds.csv",index=False)
improved_threshold["detector"] = improved_threshold.id.map(detector_from_pixel)
improved_threshold["vth_t1"] = 8
improved_threshold["vth_t2"] = 8
improved_threshold["channelID"] = improved_threshold.id.map(lambda x: x-deconvolution_dict[detector_from_pixel(x)]["Offset"])
improved_threshold["slaveID"] = 0
improved_threshold["#portID"] = improved_threshold.id // 131072
improved_threshold["chipID"] = (improved_threshold.id - improved_threshold["#portID"] * 131072) // 64
improved_threshold.rename(columns={"th": "vth_e"},inplace=True)
"""
#improved_threshold.to_csv('disc_settings.tsv', sep="\t", columns=["#portID","slaveID","chipID","channelID","vth_t1","vth_t2","vth_e"],index=False)
"""
ddd.update(
dask.dataframe.concat([
# List changes
dd.query('th == 9' ).loc[131640],
dd.query('th == 11').loc[262173],
dd.query('th == 9' ).loc[546],
dd.query('th == 8' ).loc[131615],
dd.query('th == 8' ).loc[131625],
dd.query('th == 8' ).loc[262172],
dd.query('th == 9' ).loc[802],
dd.query('th == 8' ).loc[131390],
dd.query('th == 8' ).loc[262155],
dd.query('th == 8' ).loc[131624],
dd.query('th == 8' ).loc[131638],
dd.query('th == 8' ).loc[131360],
dd.query('th == 8' ).loc[131858],
dd.query('th == 8' ).loc[131614],
]).compute().set_index('id')
)
"""
"""
ddd.update(
dask.dataframe.concat([
# List changes
#0
dd.query('th == 8').loc[39],
dd.query('th == 8').loc[53],
dd.query('th == 8').loc[31],
dd.query('th == 8').loc[37],
dd.query('th == 8').loc[34],
dd.query('th == 8').loc[ 5],
dd.query('th == 9').loc[ 8],
dd.query('th == 8').loc[30],
dd.query('th == 8').loc[24],
dd.query('th == 8').loc[21],
dd.query('th == 8').loc[19],
dd.query('th == 8').loc[ 7],
#1
dd.query('th == 8').loc[168],
dd.query('th == 8').loc[185],
#6
dd.query('th == 9').loc[774],
dd.query('th == 9').loc[769],
dd.query('th == 8').loc[796],
dd.query('th == 8').loc[810],
dd.query('th == 8').loc[801],
dd.query('th == 10').loc[802],
]).compute().set_index('id')
)
ddd["id"] = ddd.index
ddd.counts.sum()
"""
"""
entries_df = get_entry_from_binary4(get_file(th_dict[0]["path"]))
th_dict[0]["dd"] = entries_df.id\
.value_counts()\
.to_frame(name="counts")
th_dict[0]["dd"].counts /= th_dict[0]["time"]
th_dict[0]["dd"]["id"] = th_dict[0]["dd"].index
th_dict[0]["dd"]["crystal_id"] = th_dict[0]["dd"].id.map(deconvolute)
th_dict[0]["dd"]["iTED"] = th_dict[0]["dd"].crystal_id.map(lambda x: deconvolution_dict[x]["iTED"])
th_dict[0]["dd"]["crystal"] = th_dict[0]["dd"].crystal_id.map(lambda x: deconvolution_dict[x]["Crystal"])
th_dict[0]["dd"]["pixel"] = th_dict[0]["dd"].id.map(lambda x: x - deconvolution_dict[deconvolute(x)]["Offset"])
th_dict[0]["dd"]["coordinates"] = th_dict[0]["dd"].apply(
lambda entry: pixel_maps.query(f'crystal_id == {entry.crystal_id}')
.pixel_map
.item()
.stack()
.to_frame("pixel_id")
.query(f"pixel_id == {entry.pixel}")
.index
.item(),
axis=1,
)
th_dict[0]["dd"]['x'] = th_dict[0]["dd"].coordinates.map(lambda x: x[1])
th_dict[0]["dd"]['y'] = th_dict[0]["dd"].coordinates.map(lambda x: x[0])
"""